package com.flink.examples.mysql;

import com.flink.examples.TUser;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.table.api.*;
import org.apache.flink.types.Row;

/**
 * @Description 将Table数据流插入到mysql表中
 * @Author JL
 * @Date 2020/09/17
 * @Version V1.0
 */
public class TableSink {

    public static void main(String[] args) throws Exception {
        EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
        TableEnvironment tEnv = TableEnvironment.create(bbSettings);
        //插入sql
        String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)";
        //封装数据
        TUser user = new TUser();
        user.setId(0);
        user.setName("zhao2");
        user.setAge(22);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(System.currentTimeMillis());

        //将实体转换为Row对象，new Row(字段个数)
        Row row = new Row(6);
        row.setField(0, user.getId());
        row.setField(1, user.getName());
        row.setField(2, user.getAge());
        row.setField(3, user.getSex());
        row.setField(4, user.getAddress());
        row.setField(5, user.getCreateTimeSeries());

        //输出到mysql
        //设置表视图字段与类型
        TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
                .field("age", DataTypes.INT())
                .field("sex", DataTypes.INT())
                .field("address", DataTypes.STRING())
                //.field("createTime", DataTypes.TIMESTAMP())
                .field("createTimeSeries", DataTypes.BIGINT())
                .build();

        //将row转换为table
        Table table = tEnv.fromValues(tableSchema.toRowDataType(), row);
//        Table table = tEnv.fromValues(DataTypes.ROW(
//                DataTypes.FIELD("id", DataTypes.INT()),
//                DataTypes.FIELD("name", DataTypes.STRING()),
//                DataTypes.FIELD("age", DataTypes.INT()),
//                DataTypes.FIELD("sex", DataTypes.INT()),
//                DataTypes.FIELD("address", DataTypes.STRING()),
//                DataTypes.FIELD("createTimeSeries", DataTypes.BIGINT())
//        ), row);

        //设置sink输出jdbc
        org.apache.flink.table.sinks.TableSink tableSink = JDBCAppendTableSink.builder()
                .setDrivername(MysqlConfig.DRIVER_CLASS)
                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)
                .setUsername(MysqlConfig.SOURCE_USER)
                .setPassword(MysqlConfig.SOURCE_PASSWORD)
                .setQuery(sql)
                .setParameterTypes(tableSchema.getFieldTypes())
                .setBatchSize(100)
                .build();

        //将数据源注册到tableEnv视图result中
        tEnv.registerTableSink("result",
                tableSchema.getFieldNames(),
                tableSchema.getFieldTypes(),
                tableSink);
        //在指定的路径下注册，然后执行插入操作
        table.executeInsert("result");
    }

}
